package tcp

import (
	"context"
	"encoding/hex"
	"errors"
	"gitee.com/go-vulcanus/vulcanus"
	managev1 "gitee.com/go-vulcanus/vulcanus/api/devmanage/v1"
	"gitee.com/go-vulcanus/vulcanus/connection"
	"gitee.com/go-vulcanus/vulcanus/transport"
	"gitee.com/go-vulcanus/vulcanus/utils"
	"gitee.com/go-vulcanus/vulcanus/utils/conversion"
	"gitee.com/go-vulcanus/vulcanus/utils/date"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/go-netty/go-netty"
	"github.com/go-netty/go-netty/codec"
	nettyUtils "github.com/go-netty/go-netty/utils"
)

type ServerOption func(*Server)

// Port with server port.
func Port(port uint32) ServerOption {
	return func(s *Server) {
		s.port = port
	}
}

type MessageHandler struct {
	codec   vulcanus.Codec
	handler vulcanus.DataHandler
	ctx     *transport.Context
}

func (h MessageHandler) HandleActive(ctx netty.ActiveContext) {
	log.Infof("[%s] new client connection", ctx.Channel().RemoteAddr())
}

func (h MessageHandler) HandleException(ctx netty.ExceptionContext, ex netty.Exception) {
	deviceCode := ""
	if ctx.Attachment() != nil {
		deviceCode = ctx.Attachment().(string)
	}
	log.Warnf("[%s] [%s] exception in connection, err: %s", ctx.Channel().RemoteAddr(), deviceCode, ex)
	if ex.Error() == "EOF" {
		ctx.Channel().Close(ex)
	}
}

func (h MessageHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
	deviceCode := ""
	if ctx.Attachment() != nil {
		deviceCode = ctx.Attachment().(string)
	}
	log.Infof("[%s] [%s] connection inactive, err: %s", ctx.Channel().RemoteAddr(), deviceCode, ex)
	if deviceCode != "" {
		metadata := &vulcanus.Metadata{
			DeviceCode: deviceCode,
			Time:       date.Now(),
			Type:       "连接信息",
			Data:       "设备断开连接",
		}
		h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
		h.ctx.Sm.Delete(deviceCode)
	}
	// disconnected，the default processing is to close the connection
	ctx.HandleInactive(ex)
}

func (h MessageHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
	// leave it to the next handler
	//ctx.HandleRead(message)
	frame := nettyUtils.MustToBytes(message)
	if len(frame) == 0 {
		return
	}

	deviceMsg, err := h.codec.Decode(frame)
	if err != nil {
		log.Errorf("[%s] decode error [%s] metadata: [%s]", ctx.Channel().RemoteAddr(), err.Error(), hex.EncodeToString(frame))
		return
	}

	deviceCode := utils.If(ctx.Attachment() == nil, deviceMsg.DeviceCode, ctx.Attachment()).(string)

	// 获取设备配置信息
	request := &managev1.GetDeviceConfigRequest{DeviceCode: deviceCode}
	reply, err := h.ctx.D.M1.GetDeviceConfig(context.Background(), request)
	if err != nil {
		log.Errorf("[%s] get device conf err: %s", deviceCode, err.Error())
		return
	}

	if reply.Code != 200 {
		log.Warnf("[%s] [%s] not found device conf, metadata: [%s]", deviceCode, hex.EncodeToString(frame))
		ctx.Channel().Close(errors.New(deviceCode + " not found device conf"))
		return
	}
	deviceConfig := reply.Data

	// 设备连接第一条数据
	if ctx.Attachment() == nil {
		// 保存设备码到上下文中
		ctx.SetAttachment(deviceMsg.DeviceCode)
		metadata := &vulcanus.Metadata{
			DeviceCode: deviceMsg.DeviceCode,
			Time:       date.Now(),
			Type:       "连接信息",
			Data:       "设备连接",
		}
		h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
	}
	deviceMsg.DeviceCode = ctx.Attachment().(string)
	deviceMsg.DeviceType = h.ctx.C.Device.DeviceType

	log.Infof("[%s] [%s] command: [%s] metadata: [%s]", deviceCode,
		ctx.Channel().RemoteAddr(), deviceMsg.Command, deviceMsg.Metadata)

	deviceMsg.DeviceChannel = &connection.DeviceChannel{DeviceCode: deviceCode, Channel: ctx.Channel()}
	// 保存设备会话信息
	h.ctx.Sm.Add(deviceMsg.DeviceChannel)

	deviceMsg.DeviceConfig = deviceConfig

	// 向下游发送设备原始数据
	metadata := &vulcanus.Metadata{
		DeviceCode: deviceMsg.DeviceCode,
		Time:       date.Now(),
		Type:       "设备数据",
		Data:       deviceMsg.Metadata,
	}
	h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))

	// 处理设备指令
	err = h.handler.Handle(h.ctx, deviceMsg)
	if err != nil {
		log.Errorf("[%s] handle error: %s", deviceCode, err.Error())
	}
}

// Server is an TCP server wrapper.
type Server struct {
	port uint32
	b    netty.Bootstrap
}

// NewServer creates a TCP server by options.
func NewServer(ctx *transport.Context, codec []codec.Codec, codec2 vulcanus.Codec, handler vulcanus.DataHandler, opts ...ServerOption) *Server {
	srv := &Server{}
	for _, o := range opts {
		o(srv)
	}

	var childInitializer = func(channel netty.Channel) {
		for _, c := range codec {
			channel.Pipeline().
				AddLast(c)
		}
		channel.Pipeline().
			AddLast(MessageHandler{
				ctx:     ctx,
				codec:   codec2,
				handler: handler,
			})
	}

	// create bootstrap & listening & accepting
	bootstrap := netty.NewBootstrap(netty.WithChildInitializer(childInitializer))
	srv.b = bootstrap
	return srv
}

// Start start the TCP server.
func (s *Server) Start(ctx context.Context) error {
	log.Infof("[TCP] server listening on [%d]", s.port)
	err := s.b.Listen(":" + conversion.IntToStr(int(s.port))).Sync()
	return err
}

// Stop stop the TCP server.
func (s *Server) Stop(ctx context.Context) error {
	log.Info("[TCP] server stopping")
	s.b.Shutdown()
	return nil
}
